from pyspark.sql import SparkSession,DataFrame

class MostRatedFilms:

    def run(self,moviesDataSet:DataFrame,ratingsDataSet:DataFrame,spark:SparkSession):

        # 将dataframe注册成视图
        moviesDataSet.createOrReplaceTempView("movies")
        ratingsDataSet.createOrReplaceTempView("ratings")


        sql="""
             with rating_cnt as(
                select movieId,count(1) as rating_cnt
                from ratings group by movieId
            ),rating_filter as(
                select movieId,rating_cnt
                from rating_cnt order by rating_cnt desc
                limit 10
            )
            select m.movieId,m.title,r.rating_cnt as cntRating
            from rating_filter r inner join movies m on r.movieId=m.movieId
        """
        resultDS=spark.sql(sql)
        return resultDS
